package cn.doitedu.rtmk.demo.demo4;

import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;


@Slf4j
@Getter
@Setter
public class RuleModel001Calculator implements RuleModelCalculator {

    RuleModel001ParamBean ruleParamBean;
    ListState<EventBean> con1State;
    ListState<EventBean> con2State;
    Table table;

    @Override
    public String getRuleId() {
        return ruleParamBean.getRuleId();
    }

    @Override
    public void init(String ruleParamJson, RuntimeContext runtimeContext) throws IOException {
        ruleParamBean = JSON.parseObject(ruleParamJson, RuleModel001ParamBean.class);

        // 创建动态画像所需要的 状态
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(5)).useProcessingTime().build();
        ListStateDescriptor<EventBean> con1StateDesc =
                new ListStateDescriptor<>("con1_state", EventBean.class);
        con1StateDesc.enableTimeToLive(ttlConfig);
        con1State = runtimeContext.getListState(con1StateDesc);

        // 创建动态画像所需要的 状态
        StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.minutes(30)).useProcessingTime().build();
        ListStateDescriptor<EventBean> con2StateDesc =
                new ListStateDescriptor<>("con2_state", EventBean.class);
        con1StateDesc.enableTimeToLive(ttlConfig2);
        con2State = runtimeContext.getListState(con2StateDesc);

        // 创建hbase客户端
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "doitedu:2181");
        Connection conn = ConnectionFactory.createConnection(config);
        table = conn.getTable(TableName.valueOf("user_profile"));
    }


    @Override
    public void calc(
            EventBean eventBean,
            Collector<RtmkMessage> out
    ) throws Exception {
        // 进行规则所需要的实时画像标签的计算
        // 实时画像条件：  item_share 发生次数>=2 ;  item_like 发生次数>=1
        if (eventBean.getEventId().equals("item_share")) con1State.add(eventBean);
        if (eventBean.getEventId().equals("item_like")) con2State.add(eventBean);

        // 判断是否是预定义的营销规则的触发事件 ：加购事件（加购的商品价格>100)
        if (eventBean.getEventId().equals(ruleParamBean.getFireEventId()) && Double.parseDouble(eventBean.getProperties().get(ruleParamBean.getFileEventPropertyKey())) > ruleParamBean.getFileEventPropertyValue()) {
            log.warn("用户:{},触发了规则：{}" ,eventBean.getUserId(),ruleParamBean.getRuleId());

            // 判断该事件的行为人是否满足营销规则中定义的判断条件
            // 离线静态画像条件：从离线画像库中可以查询到的条件
            //  age : (20,40]
            //  gender: male
            //  半年内月平均消费额(tg03): > 1000
            Get get = new Get(Bytes.toBytes(eventBean.getUserId()));
            Result result = table.get(get);

            int age = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")));
            String gender = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("gender")));
            int tg03 = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("tg03")));

            if (age > ruleParamBean.getAgeConditionMin() && age <= ruleParamBean.getAgeConditionMax() && gender.equals(ruleParamBean.getGenderCondition()) && tg03 > ruleParamBean.getTg03Condition()) {
                //  实时动态画像条件：
                //  最近 5分钟发生过 ： 分享行为 2次以上
                //  最近 30分钟发生过 ：点赞行为 1次以上
                int con1Count = 0;
                for (EventBean bean : con1State.get()) {
                    con1Count++;
                }
                if (con1Count > ruleParamBean.getDynamicConditionValue1()) {
                    int con2Count = 0;
                    for (EventBean bean : con2State.get()) {
                        con2Count++;
                    }
                    if (con2Count > ruleParamBean.getDynamicConditionvalue2()) {
                        // 走到这里，就所有的条件（包含离线静态画像标签条件和 实时动态画像标签条件）都满足了
                        log.warn("用户: {},时间: {} ,命中了规则: {}",eventBean.getUserId(),eventBean.getTimeStamp(),ruleParamBean.getRuleId());
                        out.collect(new RtmkMessage(eventBean.getUserId(), eventBean.getTimeStamp(), ruleParamBean.ruleId));
                    }
                }
            }
        }
    }
}












